/**
 * Copyright 2005 The Apache Software Foundation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapred;

import java.io.IOException;
import java.io.File;
import java.util.Arrays;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.conf.Configuration;

/** An {@link org.apache.hadoop.mapred.OutputFormat} that writes {@link org.apache.hadoop.io.MapFile}s. */
public class MapFileOutputFormat extends OutputFormatBase {

  public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
                                      String name) throws IOException {

    File file = new File(job.getOutputDir(), name);

    final MapFile.Writer out =
      new MapFile.Writer(fs, file.toString(),
                         job.getOutputKeyClass(),
                         job.getOutputValueClass(),
                         job.getBoolean("mapred.output.compress", false));

    return new RecordWriter() {

        public void write(WritableComparable key, Writable value)
          throws IOException {

          out.append(key, value);
        }

        public void close(Reporter reporter) throws IOException { out.close();}
      };
  }

  /** Open the output generated by this format. */
  public static MapFile.Reader[] getReaders(FileSystem fs, File dir, Configuration conf)
    throws IOException {
    File[] names = fs.listFiles(dir);
    
    // sort names, so that hash partitioning works
    Arrays.sort(names);
    
    MapFile.Reader[] parts = new MapFile.Reader[names.length];
    for (int i = 0; i < names.length; i++) {
      parts[i] = new MapFile.Reader(fs, names[i].toString(), conf);
    }
    return parts;
  }
    
  /** Get an entry from output generated by this class. */
  public static Writable getEntry(MapFile.Reader[] readers,
                                  Partitioner partitioner,
                                  WritableComparable key,
                                  Writable value) throws IOException {
    int part = partitioner.getPartition(key, value, readers.length);
    return readers[part].get(key, value);
  }

}

